package com.lwq.fast.log.fastlogserver.redis.listener;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.lwq.fast.log.fastlogcore.constant.Constants;
import com.lwq.fast.log.fastlogcore.entity.Message;
import com.lwq.fast.log.fastlogcore.util.ThreadPoolUtil;
import com.lwq.fast.log.fastlogserver.es.service.ElasticSearchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;


/**
 * 消息监听器，监听redis消息
 * @author 刘文强
 */
@Configuration
@ConditionalOnProperty(name = "active.client", havingValue = "redis")
@Slf4j
public class RedisConsumerListener {


    @Autowired
    private JedisPool jedisPool;
    @Value("${redis.sleep.timeout}")
    private int timeOut;
    @Value("${elasticsearch.batch.insert}")
    private boolean enableBatchInsert;
    @Value("${elasticsearch.batch.size}")
    private Integer batchSize;
    @Value("${elasticsearch.batch.await}")
    private Long awaitMs;

    @Autowired
    private ElasticSearchService elasticSearchService;

    @Bean
    public void onMessage(){
        ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getThreadPoolExecutor();
        threadPoolExecutor.execute(() ->{
            List<String> batchMsg = new ArrayList<>();
            long startTime = System.currentTimeMillis();
            while (true){
                Jedis jedis = jedisPool.getResource();
                try {
                    List<String> strMsgList = jedis.brpop(timeOut, Constants.REDIS_LOG_KEY);
                    if (CollectionUtil.isNotEmpty(strMsgList)){
                        for (String msg : strMsgList) {
                            if (enableBatchInsert){
                                long l = System.currentTimeMillis() - startTime;
                                // 开启批量插入
                                if (batchMsg.size() >= batchSize || l >= awaitMs){
                                    batchInsert(batchMsg);
                                    batchMsg.clear();
                                    startTime = System.currentTimeMillis();
                                }else{
                                    batchMsg.add(msg);
                                }
                            }else{
                                // 未开启批量插入
                                singleInsert(msg);
                            }
                        }
                    }else{
                        long l = System.currentTimeMillis() - startTime;
                        if (CollectionUtil.isNotEmpty(batchMsg)){
                            if (batchMsg.size() >= batchSize || l >= awaitMs){
                                batchInsert(batchMsg);
                                batchMsg.clear();
                                startTime = System.currentTimeMillis();
                            }
                        }
                    }
                }catch (Exception e){
                    log.error("RedisConsumerListener onMessage error ", e);
                }finally {
                    if (null != jedis){
                        jedis.close();
                    }
                }
            }
        });

    }

    private void batchInsert(List<String> msgList) {
        List<Message> messageList = new ArrayList<>();
        for (String msg : msgList) {
            try{
                Message message = JSON.parseObject(msg, Message.class);
                if (ObjectUtil.isNotNull(message)){
                    messageList.add(message);
                }
            }catch (Exception e){
                // 序列化出错，静默处理
            }
        }
        if (CollectionUtil.isNotEmpty(messageList)){
            try{
                elasticSearchService.logMessage2EsBatch(messageList);
            }catch (Exception e){
                log.error("批量写入数据到Es出错",e);
            }
        }
    }

    private void singleInsert(String msg) {
        Message message = null;
        try{
            message = JSON.parseObject(msg, Message.class);
        }catch (Exception formatException){
        }

        if (ObjectUtil.isNotNull(message)){
            // 如果不能转换成 Message 类型，就抛弃
            try{
                elasticSearchService.logMessage2Es(msg);
            }catch (Exception e){
                log.error("写入数据到Es出错",e);
            }
        }
    }

}
